/**
 * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
 *
 * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com)
 *     - for: redis array reply support
 *
 * Copyright (C) 2017 Carsten Bock (ng-voice GmbH)
 *     - for: Cluster support
 *
 * This file is part of Kamailio, a free SIP server.
 *
 * Kamailio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version
 *
 * Kamailio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 *
 */

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <stdarg.h>

#include "../../core/mem/mem.h"
#include "../../core/dprint.h"
#include "../../core/hashes.h"
#include "../../core/ut.h"

#include "redis_client.h"

#define redisCommandNR(a...)        \
	(int)({                         \
		void *__tmp;                \
		__tmp = redisCommand(a);    \
		if(__tmp) {                 \
			freeReplyObject(__tmp); \
		};                          \
		__tmp ? 0 : -1;             \
	})

static redisc_server_t *_redisc_srv_list = NULL;

static redisc_reply_t *_redisc_rpl_list = NULL;

extern int init_without_redis;
extern int redis_connect_timeout_param;
extern int redis_cmd_timeout_param;
extern int redis_cluster_param;
extern int redis_disable_time_param;
extern int redis_allowed_timeouts_param;
extern int redis_flush_on_reconnect_param;
extern int redis_allow_dynamic_nodes_param;
extern int ndb_redis_debug;
#ifdef WITH_SSL
extern char *ndb_redis_ca_path;
#endif

/* backwards compatibility with hiredis < 0.12 */
#if(HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
typedef char *sds;
sds sdscatlen(sds s, const void *t, size_t len);
int redis_append_formatted_command(
		redisContext *c, const char *cmd, size_t len);
#else
#define redis_append_formatted_command redisAppendFormattedCommand
#endif

/**
 *
 */
int redisc_init(void)
{
	char addr[256], pass[256], unix_sock_path[256], sentinel_group[256];

	unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1;
#ifdef WITH_SSL
	unsigned int enable_ssl = 0;
#endif
	int i, row;
	redisc_server_t *rsrv = NULL;
	param_t *pit = NULL;
	struct timeval tv_conn;
	struct timeval tv_cmd;

	tv_conn.tv_sec = (int)redis_connect_timeout_param / 1000;
	tv_conn.tv_usec = (int)(redis_connect_timeout_param % 1000) * 1000;

	tv_cmd.tv_sec = (int)redis_cmd_timeout_param / 1000;
	tv_cmd.tv_usec = (int)(redis_cmd_timeout_param % 1000) * 1000;

	if(_redisc_srv_list == NULL) {
		LM_ERR("no redis servers defined\n");
		return -1;
	}

	for(rsrv = _redisc_srv_list; rsrv; rsrv = rsrv->next) {
		char sentinels[MAXIMUM_SENTINELS][256];
		uint8_t sentinels_count = 0;

		port = 6379;
		db = 0;
		haspass = 0;
		sock = 0;

		memset(addr, 0, sizeof(addr));
		memset(pass, 0, sizeof(pass));
		memset(unix_sock_path, 0, sizeof(unix_sock_path));

		for(pit = rsrv->attrs; pit; pit = pit->next) {
			if(pit->name.len == 4 && strncmp(pit->name.s, "unix", 4) == 0) {
				snprintf(unix_sock_path, sizeof(unix_sock_path) - 1, "%.*s",
						pit->body.len, pit->body.s);
				sock = 1;
			} else if(pit->name.len == 4
					  && strncmp(pit->name.s, "addr", 4) == 0) {
				snprintf(addr, sizeof(addr) - 1, "%.*s", pit->body.len,
						pit->body.s);
			} else if(pit->name.len == 4
					  && strncmp(pit->name.s, "port", 4) == 0) {
				if(str2int(&pit->body, &port) < 0)
					port = 6379;
			} else if(pit->name.len == 2
					  && strncmp(pit->name.s, "db", 2) == 0) {
				if(str2int(&pit->body, &db) < 0)
					db = 0;
			} else if(pit->name.len == 4
					  && strncmp(pit->name.s, "pass", 4) == 0) {
				snprintf(pass, sizeof(pass) - 1, "%.*s", pit->body.len,
						pit->body.s);
				haspass = 1;
#ifdef WITH_SSL
			} else if(pit->name.len == 3
					  && strncmp(pit->name.s, "tls", 3) == 0) {
				snprintf(pass, sizeof(pass) - 1, "%.*s", pit->body.len,
						pit->body.s);
				if(str2int(&pit->body, &enable_ssl) < 0)
					enable_ssl = 0;
#endif
			} else if(pit->name.len == 14
					  && strncmp(pit->name.s, "sentinel_group", 14) == 0) {
				snprintf(sentinel_group, sizeof(sentinel_group) - 1, "%.*s",
						pit->body.len, pit->body.s);
			} else if(pit->name.len == 15
					  && strncmp(pit->name.s, "sentinel_master", 15) == 0) {
				if(str2int(&pit->body, &sentinel_master) < 0)
					sentinel_master = 1;
			} else if(pit->name.len == 8
					  && strncmp(pit->name.s, "sentinel", 8) == 0) {
				if(sentinels_count < MAXIMUM_SENTINELS) {
					snprintf(sentinels[sentinels_count],
							sizeof(sentinels[sentinels_count]) - 1, "%.*s",
							pit->body.len, pit->body.s);
					sentinels_count++;
				} else {
					LM_ERR("too many sentinels, maximum %d supported.\n",
							MAXIMUM_SENTINELS);
					return -1;
				}
			}
		}

		// if sentinels are provided, we need to connect to them and retrieve the redis server
		// address / port
		if(sentinels_count > 0) {
			for(i = 0; i < sentinels_count; i++) {
				char *sentinelAddr = sentinels[i];
				char *pos;
				int srvfound = 0;
				redisContext *redis;
				redisReply *res, *res2;

				port = 6379;
				if((pos = strchr(sentinelAddr, ':')) != NULL) {
					port = atoi(pos + 1);
					pos[0] = '\0';
				}

				redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn);
				if(redis) {
					if(sentinel_master != 0) {
						res = redisCommand(redis,
								"SENTINEL get-master-addr-by-name %s",
								sentinel_group);
						if(res && (res->type == REDIS_REPLY_ARRAY)
								&& (res->elements == 2)) {
							strncpy(addr, res->element[0]->str,
									res->element[0]->len + 1);
							port = atoi(res->element[1]->str);
							LM_DBG("sentinel replied: %s:%d\n", addr, port);
							srvfound = 1;
						}
					} else {
						res = redisCommand(
								redis, "SENTINEL slaves %s", sentinel_group);
						if(res && (res->type == REDIS_REPLY_ARRAY)) {
							for(row = 0; row < res->elements; row++) {
								res2 = res->element[row];
								for(i = 0; i < res2->elements; i += 2) {
									if(strncmp(res2->element[i]->str, "ip", 2)
											== 0) {
										strncpy(addr, res2->element[i + 1]->str,
												res2->element[i + 1]->len);
										addr[res2->element[i + 1]->len] = '\0';
									} else if(strncmp(res2->element[i]->str,
													  "port", 4)
											  == 0) {
										port = atoi(res2->element[i + 1]->str);
										break;
									}
								}
							}
							LOG(ndb_redis_debug, "slave for %s: %s:%d\n",
									sentinel_group, addr, port);
							srvfound = 1;
						}
					}
				}
				if(srvfound == 1) {
					break;
				}
			}
		}

#ifdef WITH_SSL
 		if(enable_ssl) {
 			/* Create SSL context*/
 			redisInitOpenSSL();
 			rsrv->sslCtxRedis = redisCreateSSLContext(
					NULL, ndb_redis_ca_path, NULL, NULL, NULL, NULL);
 			if(rsrv->sslCtxRedis == NULL) {
				LM_ERR("Unable to create Redis TLS Context.\n");
 			}
  		}
#endif

		if(sock != 0) {
			LOG(ndb_redis_debug, "Connecting to unix socket: %s\n",
					unix_sock_path);
			rsrv->ctxRedis =
					redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
		} else {
#ifdef WITH_SSL
			LOG(ndb_redis_debug, "Connecting to %s %s:%d\n",
					(enable_ssl) ? "TLS" : "UDP", addr, port);
#else
			LOG(ndb_redis_debug, "Connecting to %s:%d\n", addr, port);
#endif
			rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
		}

#ifdef WITH_SSL
 		if(enable_ssl) {
 			/* Negotiate SSL/TLS handshake*/
 			redisInitiateSSLWithContext(rsrv->ctxRedis, rsrv->sslCtxRedis);
 		}
#endif

		LOG(ndb_redis_debug, "rsrv->ctxRedis = %p\n", rsrv->ctxRedis);

		if(!rsrv->ctxRedis) {
			LM_ERR("Failed to create REDIS-Context.\n");
			goto err;
		}
		if(rsrv->ctxRedis->err) {
			LM_ERR("Failed to create REDIS returned an error: %s\n",
					rsrv->ctxRedis->errstr);
			goto err2;
		}
		if((haspass != 0) && redisc_check_auth(rsrv, pass)) {
			LM_ERR("Authentication failed.\n");
			goto err2;
		}
		if(redisSetTimeout(rsrv->ctxRedis, tv_cmd)) {
			LM_ERR("Failed to set timeout.\n");
			goto err2;
		}
		if(redisCommandNR(rsrv->ctxRedis, "PING")) {
			LM_ERR("Failed to send PING (REDIS returned %s).\n",
					rsrv->ctxRedis->errstr);
			goto err2;
		}
		if((redis_cluster_param == 0)
				&& redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) {
			LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\","
				   " and not in cluster mode).\n",
					db, rsrv->ctxRedis->errstr);
			goto err2;
		}
	}

	return 0;

err2:
	if(sock != 0) {
		LM_ERR("error communicating with redis server [%.*s]"
			   " (unix:%s db:%d): %s\n",
				rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
				rsrv->ctxRedis->errstr);
	} else {
		LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
				rsrv->sname->len, rsrv->sname->s, addr, port, db,
				rsrv->ctxRedis->errstr);
	}
	if(init_without_redis == 1) {
		LM_WARN("failed to initialize redis connections, but initializing"
				" module anyway.\n");
		return 0;
	}

	return -1;
err:
	if(sock != 0) {
		LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
				rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
	} else {
		LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
				rsrv->sname->len, rsrv->sname->s, addr, port, db);
	}
	if(init_without_redis == 1) {
		LM_WARN("failed to initialize redis connections, but initializing"
				" module anyway.\n");
		return 0;
	}

	return -1;
}

/**
 *
 */
int redisc_destroy(void)
{
	redisc_reply_t *rpl, *next_rpl;

	redisc_server_t *rsrv = NULL;
	redisc_server_t *rsrv1 = NULL;

	rpl = _redisc_rpl_list;
	while(rpl != NULL) {
		next_rpl = rpl->next;
		if(rpl->rplRedis)
			freeReplyObject(rpl->rplRedis);

		if(rpl->rname.s != NULL)
			pkg_free(rpl->rname.s);

		pkg_free(rpl);
		rpl = next_rpl;
	}
	_redisc_rpl_list = NULL;

	if(_redisc_srv_list == NULL)
		return -1;
	rsrv = _redisc_srv_list;
	while(rsrv != NULL) {
		rsrv1 = rsrv;
		rsrv = rsrv->next;
		if(rsrv1->ctxRedis != NULL)
			redisFree(rsrv1->ctxRedis);
		free_params(rsrv1->attrs);
		pkg_free(rsrv1);
	}
	_redisc_srv_list = NULL;

	return 0;
}

/**
 *
 */
int redisc_add_server(char *spec)
{
	param_t *pit = NULL;
	param_hooks_t phooks;
	redisc_server_t *rsrv = NULL;
	str s;

	s.s = spec;
	s.len = strlen(spec);
	if(s.s[s.len - 1] == ';')
		s.len--;
	if(parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
		LM_ERR("failed parsing params value\n");
		goto error;
	}
	rsrv = (redisc_server_t *)pkg_malloc(sizeof(redisc_server_t));
	if(rsrv == NULL) {
		LM_ERR("no more pkg\n");
		goto error;
	}
	memset(rsrv, 0, sizeof(redisc_server_t));
	rsrv->attrs = pit;
	rsrv->spec = spec;
	for(pit = rsrv->attrs; pit; pit = pit->next) {
		if(pit->name.len == 4 && strncmp(pit->name.s, "name", 4) == 0) {
			rsrv->sname = &pit->body;
			rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len);
			break;
		}
	}
	if(rsrv->sname == NULL) {
		LM_ERR("no server name\n");
		goto error;
	}
	rsrv->next = _redisc_srv_list;
	_redisc_srv_list = rsrv;

	return 0;
error:
	if(pit != NULL)
		free_params(pit);
	if(rsrv != NULL)
		pkg_free(rsrv);
	return -1;
}

/**
 *
 */
redisc_server_t *redisc_get_server(str *name)
{
	redisc_server_t *rsrv = NULL;
	unsigned int hname;

	hname = get_hash1_raw(name->s, name->len);
	LOG(ndb_redis_debug, "Hash %u (%.*s)\n", hname, name->len, name->s);
	rsrv = _redisc_srv_list;
	while(rsrv != NULL) {
		LM_DBG("Entry %u (%.*s)\n", rsrv->hname, rsrv->sname->len,
				rsrv->sname->s);
		if(rsrv->hname == hname && rsrv->sname->len == name->len
				&& strncmp(rsrv->sname->s, name->s, name->len) == 0) {
			LOG(ndb_redis_debug, "Using entry %u (%.*s)\n", rsrv->hname,
					rsrv->sname->len, rsrv->sname->s);
			return rsrv;
		}
		rsrv = rsrv->next;
	}
	LOG(ndb_redis_debug, "No entry found.\n");
	return NULL;
}

/**
 *
 */
int redisc_reconnect_server(redisc_server_t *rsrv)
{
	char addr[256], pass[256], unix_sock_path[256], sentinel_group[256];
	unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1;
#ifdef WITH_SSL
	unsigned int enable_ssl = 0;
#endif
	char sentinels[MAXIMUM_SENTINELS][256];
	uint8_t sentinels_count = 0;
	int i, row;
	param_t *pit = NULL;
	struct timeval tv_conn;
	struct timeval tv_cmd;

	tv_conn.tv_sec = (int)redis_connect_timeout_param / 1000;
	tv_conn.tv_usec = (int)(redis_connect_timeout_param % 1000) * 1000;

	tv_cmd.tv_sec = (int)redis_cmd_timeout_param / 1000;
	tv_cmd.tv_usec = (int)(redis_cmd_timeout_param % 1000) * 1000;

	memset(addr, 0, sizeof(addr));
	port = 6379;
	db = 0;
	memset(pass, 0, sizeof(pass));
	memset(unix_sock_path, 0, sizeof(unix_sock_path));
	for(pit = rsrv->attrs; pit; pit = pit->next) {
		if(pit->name.len == 4 && strncmp(pit->name.s, "unix", 4) == 0) {
			snprintf(unix_sock_path, sizeof(unix_sock_path) - 1, "%.*s",
					pit->body.len, pit->body.s);
			sock = 1;
		} else if(pit->name.len == 4 && strncmp(pit->name.s, "addr", 4) == 0) {
			snprintf(
					addr, sizeof(addr) - 1, "%.*s", pit->body.len, pit->body.s);
		} else if(pit->name.len == 4 && strncmp(pit->name.s, "port", 4) == 0) {
			if(str2int(&pit->body, &port) < 0)
				port = 6379;
		} else if(pit->name.len == 2 && strncmp(pit->name.s, "db", 2) == 0) {
			if(str2int(&pit->body, &db) < 0)
				db = 0;
		} else if(pit->name.len == 4 && strncmp(pit->name.s, "pass", 4) == 0) {
			snprintf(
					pass, sizeof(pass) - 1, "%.*s", pit->body.len, pit->body.s);
			haspass = 1;
#ifdef WITH_SSL
 		} else if(pit->name.len == 3 && strncmp(pit->name.s, "tls", 3) == 0) {
			snprintf(
					pass, sizeof(pass) - 1, "%.*s", pit->body.len, pit->body.s);
 			if(str2int(&pit->body, &enable_ssl) < 0)
				enable_ssl = 0;
#endif
		} else if(pit->name.len == 14
				  && strncmp(pit->name.s, "sentinel_group", 14) == 0) {
			snprintf(sentinel_group, sizeof(sentinel_group) - 1, "%.*s",
					pit->body.len, pit->body.s);
		} else if(pit->name.len == 15
				  && strncmp(pit->name.s, "sentinel_master", 15) == 0) {
			if(str2int(&pit->body, &sentinel_master) < 0)
				sentinel_master = 1;
		} else if(pit->name.len == 8
				  && strncmp(pit->name.s, "sentinel", 8) == 0) {
			if(sentinels_count < MAXIMUM_SENTINELS) {
				snprintf(sentinels[sentinels_count],
						sizeof(sentinels[sentinels_count]) - 1, "%.*s",
						pit->body.len, pit->body.s);
				sentinels_count++;
			} else {
				LM_ERR("too many sentinels, maximum %d supported.\n",
						MAXIMUM_SENTINELS);
				return -1;
			}
		}
	}

	// if sentinels are provided, we need to connect to them and retrieve the redis server
	// address / port
	if(sentinels_count > 0) {
		for(i = 0; i < sentinels_count; i++) {
			char *sentinelAddr = sentinels[i];
			char *pos;
			redisContext *redis;
			redisReply *res, *res2;

			port = 6379;
			if((pos = strchr(sentinelAddr, ':')) != NULL) {
				port = atoi(pos + 1);
				pos[i] = '\0';
			}

			redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn);
			if(redis) {
				if(sentinel_master != 0) {
					res = redisCommand(redis,
							"SENTINEL get-master-addr-by-name %s",
							sentinel_group);
					if(res && (res->type == REDIS_REPLY_ARRAY)
							&& (res->elements == 2)) {
						strncpy(addr, res->element[0]->str,
								res->element[0]->len + 1);
						port = atoi(res->element[1]->str);
						LOG(ndb_redis_debug, "sentinel replied: %s:%d\n", addr,
								port);
					}
				} else {
					res = redisCommand(
							redis, "SENTINEL slaves %s", sentinel_group);
					if(res && (res->type == REDIS_REPLY_ARRAY)) {
						for(row = 0; row < res->elements; row++) {
							res2 = res->element[row];
							for(i = 0; i < res2->elements; i += 2) {
								if(strncmp(res2->element[i]->str, "ip", 2)
										== 0) {
									strncpy(addr, res2->element[i + 1]->str,
											res2->element[i + 1]->len);
									addr[res2->element[i + 1]->len] = '\0';
								} else if(strncmp(res2->element[i]->str, "port",
												  4)
										  == 0) {
									port = atoi(res2->element[i + 1]->str);
									break;
								}
							}
						}
						LOG(ndb_redis_debug, "slave for %s: %s:%d\n",
								sentinel_group, addr, port);
					}
				}
			}
		}
	}

	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
	if(rsrv->ctxRedis != NULL) {
		redisFree(rsrv->ctxRedis);
		rsrv->ctxRedis = NULL;
	}
#ifdef WITH_SSL
	if(rsrv->sslCtxRedis != NULL) {
		redisFreeSSLContext(rsrv->sslCtxRedis);
		rsrv->sslCtxRedis = NULL;
	}

	if(enable_ssl) {
		/* Create SSL context*/
		redisInitOpenSSL();
		rsrv->sslCtxRedis = redisCreateSSLContext(
				NULL, ndb_redis_ca_path, NULL, NULL, NULL, NULL);
		if(rsrv->sslCtxRedis == NULL) {
			LM_ERR("Unable to create Redis TLS Context.\n");
		}
	}
#endif

	if(sock != 0) {
		rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
	} else {
		rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
	}
#ifdef WITH_SSL
	if(enable_ssl) {
		/* Negotiate SSL/TLS handshake*/
		redisInitiateSSLWithContext(rsrv->ctxRedis, rsrv->sslCtxRedis);
	}
#endif
	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
	if(!rsrv->ctxRedis)
		goto err;
	if(rsrv->ctxRedis->err)
		goto err2;
	if((haspass) && redisc_check_auth(rsrv, pass))
		goto err2;
	if(redisSetTimeout(rsrv->ctxRedis, tv_cmd))
		goto err2;
	if(redisCommandNR(rsrv->ctxRedis, "PING"))
		goto err2;
	if((redis_cluster_param == 0)
			&& redisCommandNR(rsrv->ctxRedis, "SELECT %i", db))
		goto err2;
	if(redis_flush_on_reconnect_param)
		if(redisCommandNR(rsrv->ctxRedis, "FLUSHALL"))
			goto err2;
	return 0;

err2:
	if(sock != 0) {
		LM_ERR("error communicating with redis server [%.*s]"
			   " (unix:%s db:%d): %s\n",
				rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
				rsrv->ctxRedis->errstr);
	} else {
		LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
				rsrv->sname->len, rsrv->sname->s, addr, port, db,
				rsrv->ctxRedis->errstr);
	}
err:
	if(sock != 0) {
		LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
				rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
	} else {
		LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
				rsrv->sname->len, rsrv->sname->s, addr, port, db);
	}
	return -1;
}

/**
 *
 */
int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
{
	redisc_server_t *rsrv = NULL;
	redisc_reply_t *rpl;
	char c;
	va_list ap;

	va_start(ap, cmd);

	if(srv == NULL || cmd == NULL || res == NULL) {
		LM_ERR("invalid parameters");
		goto error_cmd;
	}
	if(srv->len == 0 || res->len == 0 || cmd->len == 0) {
		LM_ERR("invalid parameters");
		goto error_cmd;
	}
	rsrv = redisc_get_server(srv);
	if(rsrv == NULL) {
		LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
		goto error_cmd;
	}
	if(rsrv->ctxRedis == NULL) {
		LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
		goto error_cmd;
	}
	if(rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS) {
		LM_ERR("Too many pipelined commands, maximum is %d\n",
				MAXIMUM_PIPELINED_COMMANDS);
		goto error_cmd;
	}
	rpl = redisc_get_reply(res);
	if(rpl == NULL) {
		LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
		goto error_cmd;
	}
	STR_VTOZ(cmd->s[cmd->len], c);
	rsrv->piped.commands[rsrv->piped.pending_commands].len =
			redisvFormatCommand(
					&rsrv->piped.commands[rsrv->piped.pending_commands].s,
					cmd->s, ap);
	if(rsrv->piped.commands[rsrv->piped.pending_commands].len < 0) {
		LM_ERR("Invalid redis command : %s\n", cmd->s);
		goto error_cmd;
	}
	rsrv->piped.replies[rsrv->piped.pending_commands] = rpl;
	rsrv->piped.pending_commands++;

	STR_ZTOV(cmd->s[cmd->len], c);
	va_end(ap);
	return 0;

error_cmd:
	va_end(ap);
	return -1;
}


/**
 *
 */
int redisc_exec_pipelined_cmd(str *srv)
{
	redisc_server_t *rsrv = NULL;

	if(srv == NULL) {
		LM_ERR("invalid parameters");
		return -1;
	}
	if(srv->len == 0) {
		LM_ERR("invalid parameters");
		return -1;
	}
	rsrv = redisc_get_server(srv);
	if(rsrv == NULL) {
		LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
		return -1;
	}
	if(rsrv->ctxRedis == NULL) {
		LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
		return -1;
	}
	return redisc_exec_pipelined(rsrv);
}

/**
 *
 */
int redisc_create_pipelined_message(redisc_server_t *rsrv)
{
	int i;

	if(rsrv->ctxRedis->err) {
		LOG(ndb_redis_debug, "Reconnecting server because of error %d: \"%s\"",
				rsrv->ctxRedis->err, rsrv->ctxRedis->errstr);
		if(redisc_reconnect_server(rsrv)) {
			LM_ERR("unable to reconnect to REDIS server: %.*s\n",
					rsrv->sname->len, rsrv->sname->s);
			return -1;
		}
	}

	for(i = 0; i < rsrv->piped.pending_commands; i++) {
		if(redis_append_formatted_command(rsrv->ctxRedis,
				   rsrv->piped.commands[i].s, rsrv->piped.commands[i].len)
				!= REDIS_OK) {
			LM_ERR("Error while appending command %d", i);
			return -1;
		}
	}
	return 0;
}

/**
 *
 */
void redisc_free_pipelined_cmds(redisc_server_t *rsrv)
{
	int i;
	for(i = 0; i < rsrv->piped.pending_commands; i++) {
		free(rsrv->piped.commands[i].s);
		rsrv->piped.commands[i].len = 0;
	}
	rsrv->piped.pending_commands = 0;
}

/**
 *
 */
int redisc_exec_pipelined(redisc_server_t *rsrv)
{
	redisc_reply_t *rpl;
	int i;

	LM_DBG("redis server: %.*s\n", rsrv->sname->len, rsrv->sname->s);

	/* if server is disabled do nothing unless the disable time has passed */
	if(redis_check_server(rsrv)) {
		goto srv_disabled;
	}

	if(rsrv->piped.pending_commands == 0) {
		LM_WARN("call for redis_cmd without any pipelined commands\n");
		return -1;
	}
	if(rsrv->ctxRedis == NULL) {
		LM_ERR("no redis context for server: %.*s\n", rsrv->sname->len,
				rsrv->sname->s);
		goto error_exec;
	}

	/* send the commands and retrieve the first reply */
	rpl = rsrv->piped.replies[0];

	if(rpl->rplRedis != NULL) {
		/* clean up previous redis reply */
		freeReplyObject(rpl->rplRedis);
		rpl->rplRedis = NULL;
	}

	redisc_create_pipelined_message(rsrv);
	redisGetReply(rsrv->ctxRedis, (void **)&rpl->rplRedis);

	if(rpl->rplRedis == NULL) {
		/* null reply, reconnect and try again */
		if(rsrv->ctxRedis->err) {
			LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr);
		}
		if(redisc_create_pipelined_message(rsrv) == 0) {
			redisGetReply(rsrv->ctxRedis, (void **)&rpl->rplRedis);
			if(rpl->rplRedis == NULL) {
				redis_count_err_and_disable(rsrv);
				LM_ERR("Unable to read reply\n");
				goto error_exec;
			}
		} else {
			redis_count_err_and_disable(rsrv);
			goto error_exec;
		}
	}
	LM_DBG_redis_reply(rpl->rplRedis);

	/* replies are received just retrieve them */
	for(i = 1; i < rsrv->piped.pending_commands; i++) {
		rpl = rsrv->piped.replies[i];
		if(rpl->rplRedis != NULL) {
			/* clean up previous redis reply */
			freeReplyObject(rpl->rplRedis);
			rpl->rplRedis = NULL;
		}
		if(redisGetReplyFromReader(rsrv->ctxRedis, (void **)&rpl->rplRedis)
				!= REDIS_OK) {
			LM_ERR("Unable to read reply\n");
			continue;
		}
		if(rpl->rplRedis == NULL) {
			LM_ERR("Trying to read reply for command %.*s but nothing in "
				   "buffer!",
					rsrv->piped.commands[i].len, rsrv->piped.commands[i].s);
			continue;
		}
		LM_DBG_redis_reply(rpl->rplRedis);
	}
	redisc_free_pipelined_cmds(rsrv);
	rsrv->disable.consecutive_errors = 0;
	return 0;

error_exec:
	redisc_free_pipelined_cmds(rsrv);
	return -1;

srv_disabled:
	redisc_free_pipelined_cmds(rsrv);
	return -2;
}

int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv)
{
	redisc_server_t *rsrv_new;
	char buffername[100];
	unsigned int port;
	str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
	int server_len = 0;
	char spec_new[100];

	if(redis_cluster_param) {
		LM_DBG("Redis replied: \"%.*s\"\n", (int)reply->len, reply->str);
		if((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
			port = 6379;
			if(strchr(reply->str, ':') > 0) {
				tmpstr.s = strchr(reply->str, ':') + 1;
				tmpstr.len = reply->len - (tmpstr.s - reply->str);
				if(str2int(&tmpstr, &port) < 0)
					port = 6379;
				LM_DBG("Port \"%.*s\" [%i] => %i\n", tmpstr.len, tmpstr.s,
						tmpstr.len, port);
			} else {
				LM_ERR("No Port in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
						reply->str);
				return 0;
			}
			if(strchr(reply->str + 6, ' ') > 0) {
				addr.len = tmpstr.s - strchr(reply->str + 6, ' ') - 2;
				addr.s = strchr(reply->str + 6, ' ') + 1;
				LM_DBG("Host \"%.*s\" [%i]\n", addr.len, addr.s, addr.len);
			} else {
				LM_ERR("No Host in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
						reply->str);
				return 0;
			}

			memset(buffername, 0, sizeof(buffername));
			name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i",
					addr.len, addr.s, port);
			name.s = buffername;
			LOG(ndb_redis_debug, "Name of new connection: %.*s\n", name.len,
					name.s);
			rsrv_new = redisc_get_server(&name);
			if(rsrv_new) {
				LOG(ndb_redis_debug, "Reusing connection\n");
				*rsrv = rsrv_new;
				return 1;
			} else if(redis_allow_dynamic_nodes_param) {
				/* New param redis_allow_dynamic_nodes_param:
				* if set, we allow ndb_redis to add nodes that were
				* not defined explicitly in the module configuration */
				char *server_new;

				memset(spec_new, 0, sizeof(spec_new));
				/* For now the only way this can work is if
				 * the new node is accessible with default
				 * parameters for sock and db */
				server_len = snprintf(spec_new, sizeof(spec_new) - 1,
						"name=%.*s;addr=%.*s;port=%i", name.len, name.s,
						addr.len, addr.s, port);

				if(server_len < 0 || server_len > sizeof(spec_new) - 1) {
					LM_ERR("failed to print server spec string\n");
					return 0;
				}
				server_new = (char *)pkg_malloc(server_len + 1);
				if(server_new == NULL) {
					LM_ERR("Error allocating pkg mem\n");
					return 0;
				}

				strncpy(server_new, spec_new, server_len);
				server_new[server_len] = '\0';

				if(redisc_add_server(server_new) == 0) {
					rsrv_new = redisc_get_server(&name);

					if(rsrv_new) {
						*rsrv = rsrv_new;
						/* Need to connect to the new server now */
						if(redisc_reconnect_server(rsrv_new) == 0) {
							LOG(ndb_redis_debug,
									"Connected to the new server"
									" with name: %.*s\n",
									name.len, name.s);
							return 1;
						} else {
							LM_ERR("ERROR connecting to the new server with "
								   "name: %.*s\n",
									name.len, name.s);
							return 0;
						}
					} else {
						/* Adding the new node failed
						 * - cannot perform redirection */
						LM_ERR("No new connection with name (%.*s) was "
							   "created\n",
								name.len, name.s);
					}
				} else {
					LM_ERR("Could not add a new connection with name %.*s\n",
							name.len, name.s);
					pkg_free(server_new);
				}
			} else {
				LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
			}
		}
	}
	return 0;
}

/**
 *
 */
int redisc_exec(str *srv, str *res, str *cmd, ...)
{
	redisc_server_t *rsrv = NULL;
	redisc_reply_t *rpl;
	char c;
	va_list ap, ap2, ap3, ap4;
	int ret = -1;

	va_start(ap, cmd);
	va_copy(ap2, ap);
	va_copy(ap3, ap);
	va_copy(ap4, ap);

	if(srv == NULL || cmd == NULL || res == NULL) {
		LM_ERR("invalid parameters");
		goto error;
	}
	if(srv->len == 0 || res->len == 0 || cmd->len == 0) {
		LM_ERR("invalid parameters");
		goto error;
	}

	STR_VTOZ(cmd->s[cmd->len], c);

	rsrv = redisc_get_server(srv);
	if(rsrv == NULL) {
		LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
		goto error_exec;
	}

	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);

	if(rsrv->ctxRedis == NULL) {
		LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
		goto error_exec;
	}
	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);

	if(rsrv->piped.pending_commands != 0) {
		LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer."
				  " Automatically call redis_execute");
		redisc_exec_pipelined(rsrv);
	}
	/* if server is disabled do nothing unless the disable time has passed */
	if(redis_check_server(rsrv)) {
		goto srv_disabled;
	}

	rpl = redisc_get_reply(res);
	if(rpl == NULL) {
		LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
		goto error_exec;
	}
	if(rpl->rplRedis != NULL) {
		/* clean up previous redis reply */
		freeReplyObject(rpl->rplRedis);
		rpl->rplRedis = NULL;
	}

	rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap);
	if(rpl->rplRedis == NULL) {
		/* null reply, reconnect and try again */
		if(rsrv->ctxRedis->err) {
			LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr);
		}
		if(redisc_reconnect_server(rsrv) == 0) {
			rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
			if(rpl->rplRedis == NULL) {
				redis_count_err_and_disable(rsrv);
				goto error_exec;
			}
		} else {
			redis_count_err_and_disable(rsrv);
			LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len,
					srv->s);
			STR_ZTOV(cmd->s[cmd->len], c);
			goto error_exec;
		}
	}

	if(check_cluster_reply(rpl->rplRedis, &rsrv)) {
		LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
		if(rsrv->ctxRedis == NULL) {
			LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
			goto error_exec;
		}

		LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);

		if(rpl->rplRedis != NULL) {
			/* clean up previous redis reply */
			freeReplyObject(rpl->rplRedis);
			rpl->rplRedis = NULL;
		}
		rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3);
		if(rpl->rplRedis == NULL) {
			/* null reply, reconnect and try again */
			if(rsrv->ctxRedis->err) {
				LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr);
			}
			if(redisc_reconnect_server(rsrv) == 0) {
				rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4);
				if(rpl->rplRedis == NULL) {
					redis_count_err_and_disable(rsrv);
					goto error_exec;
				}
			} else {
				LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len,
						srv->s);
				STR_ZTOV(cmd->s[cmd->len], c);
				goto error_exec;
			}
		}
	}

	LM_DBG("rpl->rplRedis->type:%d\n", rpl->rplRedis->type);
	if(rpl->rplRedis->type == REDIS_REPLY_ERROR) {
		LM_ERR("Redis error:%.*s\n", (int)rpl->rplRedis->len,
				rpl->rplRedis->str);
		goto error_exec;
	}

	STR_ZTOV(cmd->s[cmd->len], c);
	rsrv->disable.consecutive_errors = 0;
	va_end(ap);
	va_end(ap2);
	va_end(ap3);
	va_end(ap4);

	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);

	return 0;

error_exec:
	STR_ZTOV(cmd->s[cmd->len], c);
	ret = -1;
	goto error;

srv_disabled:
	STR_ZTOV(cmd->s[cmd->len], c);
	ret = -2;
	goto error;

error:
	va_end(ap);
	va_end(ap2);
	va_end(ap3);
	va_end(ap4);
	return ret;
}

/**
 * Executes a redis command.
 * Command is coded using a vector of strings, and a vector of lengths.
 *
 * @param rsrv Pointer to a redis_server_t structure.
 * @param argc number of elements in the command vector.
 * @param argv vector of zero terminated strings forming the command.
 * @param argvlen vector of command string lengths or NULL.
 * @return redisReply structure or NULL if there was an error.
 */
redisReply *redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
		const size_t *argvlen)
{
	redisReply *res = NULL;

	if(rsrv == NULL) {
		LM_ERR("no redis context found for server %.*s\n",
				(rsrv) ? rsrv->sname->len : 0, (rsrv) ? rsrv->sname->s : "");
		return NULL;
	}

	LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
	if(rsrv->ctxRedis == NULL) {
		LM_ERR("no redis context found for server %.*s\n",
				(rsrv) ? rsrv->sname->len : 0, (rsrv) ? rsrv->sname->s : "");
		return NULL;
	}

	if(argc <= 0) {
		LM_ERR("invalid parameters\n");
		return NULL;
	}
	if(argv == NULL || *argv == NULL) {
		LM_ERR("invalid parameters\n");
		return NULL;
	}
again:
	res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);

	/* null reply, reconnect and try again */
	if(rsrv->ctxRedis->err) {
		LOG(ndb_redis_debug, "Redis error: %s\n", rsrv->ctxRedis->errstr);
	}

	if(res) {
		if(check_cluster_reply(res, &rsrv)) {
			freeReplyObject(res);
			LOG(ndb_redis_debug, "Retrying the command directly\n");
			goto again;
		}
		return res;
	}

	if(redisc_reconnect_server(rsrv) == 0) {
		LOG(ndb_redis_debug, "Trying to reconnect to server\n");
		res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
		if(res) {
			if(check_cluster_reply(res, &rsrv)) {
				freeReplyObject(res);
				LOG(ndb_redis_debug, "Retrying the command after reconnect\n");
				goto again;
			}
		}
	} else {
		LM_ERR("Unable to reconnect to server: %.*s\n", rsrv->sname->len,
				rsrv->sname->s);
		return NULL;
	}

	return res;
}

/**
 *
 */
redisc_reply_t *redisc_get_reply(str *name)
{
	redisc_reply_t *rpl;
	unsigned int hid;

	hid = get_hash1_raw(name->s, name->len);

	for(rpl = _redisc_rpl_list; rpl; rpl = rpl->next) {
		if(rpl->hname == hid && rpl->rname.len == name->len
				&& strncmp(rpl->rname.s, name->s, name->len) == 0)
			return rpl;
	}
	/* not found - add a new one */

	rpl = (redisc_reply_t *)pkg_malloc(sizeof(redisc_reply_t));
	if(rpl == NULL) {
		LM_ERR("no more pkg\n");
		return NULL;
	}
	memset(rpl, 0, sizeof(redisc_reply_t));
	rpl->hname = hid;
	rpl->rname.s = (char *)pkg_malloc(name->len + 1);
	if(rpl->rname.s == NULL) {
		LM_ERR("no more pkg.\n");
		pkg_free(rpl);
		return NULL;
	}
	strncpy(rpl->rname.s, name->s, name->len);
	rpl->rname.len = name->len;
	rpl->rname.s[name->len] = '\0';
	rpl->next = _redisc_rpl_list;
	_redisc_rpl_list = rpl;
	return rpl;
}


/**
 *
 */
int redisc_free_reply(str *name)
{
	redisc_reply_t *rpl;
	unsigned int hid;

	if(name == NULL || name->len == 0) {
		LM_ERR("invalid parameters");
		return -1;
	}

	hid = get_hash1_raw(name->s, name->len);

	rpl = _redisc_rpl_list;
	while(rpl) {

		if(rpl->hname == hid && rpl->rname.len == name->len
				&& strncmp(rpl->rname.s, name->s, name->len) == 0) {
			if(rpl->rplRedis) {
				freeReplyObject(rpl->rplRedis);
				rpl->rplRedis = NULL;
			}

			return 0;
		}

		rpl = rpl->next;
	}

	/* reply entry not found. */
	LOG(ndb_redis_debug, "reply entry not found: %.*s\n", name->len, name->s);
	return -1;
}

int redisc_check_auth(redisc_server_t *rsrv, char *pass)
{
	redisReply *reply;
	int retval = 0;

	reply = redisCommand(rsrv->ctxRedis, "AUTH %s", pass);
	if(!reply) {
		LM_ERR("Redis authentication error\n");
		return -1;
	}
	if(reply->type == REDIS_REPLY_ERROR) {
		LM_ERR("Redis authentication error\n");
		retval = -1;
	}
	freeReplyObject(reply);
	return retval;
}

/* backwards compatibility with hiredis < 0.12 */
#if(HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
{
	sds newbuf;

	newbuf = sdscatlen(c->obuf, cmd, len);
	if(newbuf == NULL) {
		c->err = REDIS_ERR_OOM;
		strcpy(c->errstr, "Out of memory");
		return REDIS_ERR;
	}
	c->obuf = newbuf;
	return REDIS_OK;
}
#endif

int redis_check_server(redisc_server_t *rsrv)
{

	if(rsrv->disable.disabled) {
		if(get_ticks() > rsrv->disable.restore_tick) {
			LM_NOTICE("REDIS server %.*s re-enabled", rsrv->sname->len,
					rsrv->sname->s);
			rsrv->disable.disabled = 0;
			rsrv->disable.consecutive_errors = 0;
		} else {
			return 1;
		}
	}
	return 0;
}

int redis_count_err_and_disable(redisc_server_t *rsrv)
{
	if(redis_allowed_timeouts_param < 0) {
		return 0;
	}

	rsrv->disable.consecutive_errors++;
	if(rsrv->disable.consecutive_errors > redis_allowed_timeouts_param) {
		rsrv->disable.disabled = 1;
		rsrv->disable.restore_tick = get_ticks() + redis_disable_time_param;
		LM_WARN("REDIS server %.*s disabled for %d seconds", rsrv->sname->len,
				rsrv->sname->s, redis_disable_time_param);
		return 1;
	}
	return 0;
}

void print_redis_reply(int log_level, redisReply *rpl, int offset)
{
	int i;
	char padding[MAXIMUM_NESTED_KEYS + 1];

	if(!is_printable(log_level))
		return;

	if(!rpl) {
		LM_ERR("Unexpected null reply");
		return;
	}

	if(offset > MAXIMUM_NESTED_KEYS) {
		LM_ERR("Offset is too big");
		return;
	}

	for(i = 0; i < offset; i++) {
		padding[i] = '\t';
	}
	padding[offset] = '\0';

	switch(rpl->type) {
		case REDIS_REPLY_STRING:
			LOG(log_level, "%sstring reply: [%s]", padding, rpl->str);
			break;
		case REDIS_REPLY_INTEGER:
			LOG(log_level, "%sinteger reply: %lld", padding, rpl->integer);
			break;
		case REDIS_REPLY_ARRAY:
			LOG(log_level, "%sarray reply with %d elements", padding,
					(int)rpl->elements);
			for(i = 0; i < rpl->elements; i++) {
				LOG(log_level, "%selement %d:", padding, i);
				print_redis_reply(log_level, rpl->element[i], offset + 1);
			}
			break;
		case REDIS_REPLY_NIL:
			LOG(log_level, "%snil reply", padding);
			break;
		case REDIS_REPLY_STATUS:
			LOG(log_level, "%sstatus reply: %s", padding, rpl->str);
			break;
		case REDIS_REPLY_ERROR:
			LOG(log_level, "%serror reply: %s", padding, rpl->str);
			break;
	}
}
